Skip to content

feat(feature-processor): Add Lake Formation credential vending and Spark 3.5/Python 3.12 support#5816

Open
BassemHalim wants to merge 23 commits intoaws:masterfrom
BassemHalim:feat/feature-store-fp-lf
Open

feat(feature-processor): Add Lake Formation credential vending and Spark 3.5/Python 3.12 support#5816
BassemHalim wants to merge 23 commits intoaws:masterfrom
BassemHalim:feat/feature-store-fp-lf

Conversation

@BassemHalim
Copy link
Copy Markdown
Contributor

@BassemHalim BassemHalim commented Apr 30, 2026

This PR upgrades the Feature Processor module to support PySpark 3.5 and Python 3.12, adds Lake Formation credential vending for data ingestion, and improves Spark version handling across the board.

Description of changes:

  • Spark version upgrade: Bumps PySpark from 3.3.2 to 3.5.1 and sagemaker-feature-store-pyspark from 3.3 to 2.0.0 in
    pyproject.toml.
  • Python 3.12 support: Extends the allowed Python versions for Spark remote jobs from [3.9] to [3.9, 3.12] in
    sagemaker-core/job.py.
  • Auto-detect PySpark version: _get_default_spark_image now detects the installed PySpark version at runtime instead of using a
    hardcoded default.
  • Auto-inject Feature Store PySpark dependency: When spark_config is set, _JobSettings.init automatically appends pip
    install sagemaker-feature-store-pyspark and a JAR copy command to pre_execution_commands.
  • New _image_resolver.py module: Introduces _get_spark_image_uri() with a SPARK_IMAGE_SUPPORT_MATRIX that maps Spark versions
    to supported Python versions, replacing the hardcoded logic in feature_scheduler.py.
  • Dynamic Hadoop version resolution: New SPARK_TO_HADOOP_MAP and _get_hadoop_version() in _spark_factory.py resolve the correct
    Hadoop Maven coordinates based on the installed PySpark version.
  • Lazy imports for feature_store_pyspark: Moves feature_store_pyspark imports from module-level to inside methods, preventing
    import errors when the package isn't installed.
  • Feature Store JARs always on classpath: spark.jars config now includes version-matched Feature Store JARs for both training
    and non-training jobs. A new _install_feature_store_jars() method copies JARs to /usr/lib/spark/jars/.
  • Lake Formation credential vending: Adds use_lake_formation_credentials parameter to @feature_processor decorator, threaded
    through FeatureProcessorConfig -> _udf_output_receiver.ingest_udf_output() -> FeatureStoreManager.ingest_data.
  • ECDSA signing key generation: _config_uploader._prepare_and_upload_callable() now generates an ECDSA key pair, passes the
    private key to StoredFunction, and returns the public key PEM. The public key is set as REMOTE_FUNCTION_SECRET_KEY environment
    variable on the ModelTrainer.
  • Conditional image_uri in scheduler: _get_remote_decorator_config_from_input now only sets image_uri if one isn't already
    provided, allowing user-specified images.
  • Updated pre_execution_commands: Integration test helper uses python3 -m pip and python3 -m awscli patterns, installs awscli
    explicitly, and installs mlops_whl instead of sagemaker_whl with [feature-processor] extras.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Add configurable use_lake_formation_credentials parameter to the
@feature_processor decorator, defaulting to False. The value flows
through FeatureProcessorConfig to the Spark connector's ingest_data()
call, enabling Lake Formation credential vending when set to True.

---
X-AI-Prompt: make useLakeFormationCreds configurable, defaults to False, passed to feature_processor
X-AI-Tool: kiro-cli
Generate ECDSA signing key in ConfigUploader and pass it to
StoredFunction for function payload signature verification. The
public key PEM is returned to callers for remote-side verification.

---
X-AI-Prompt: fix StoredFunction missing signing_key error in feature_processor pipeline
X-AI-Tool: kiro-cli
Add _image_resolver module that resolves the SageMaker Spark
processing container image URI based on installed PySpark and Python
versions. Supports Spark 3.1/3.2/3.3/3.5 with appropriate Python
version mapping. Uses container_version=v1 as a floating tag.

---
X-AI-Prompt: add image resolver with container_version v1 for spark processing image
X-AI-Tool: kiro-cli
…cheduler

Update feature_scheduler to use _get_spark_image_uri for dynamic
image resolution instead of _JobSettings._get_default_spark_image.
Thread public_key_pem from ConfigUploader through to ModelTrainer
environment as REMOTE_FUNCTION_SECRET_KEY. Allow user-provided
image_uri to take precedence over auto-resolved URI.

---
X-AI-Prompt: integrate image resolver and signing key into feature scheduler pipeline
X-AI-Tool: kiro-cli
…Store JARs

Resolve Hadoop version dynamically based on installed PySpark version
instead of hardcoding 3.3.1. Move Feature Store JAR classpath setup
outside the non-training-job guard so spark.jars is always set,
fixing FeatureStoreManager class loading in training job mode.

---
X-AI-Prompt: fix spark factory hadoop version and jar classpath for spark 3.5
X-AI-Tool: kiro-cli
Update _get_default_spark_image to accept Python 3.12 in addition to
3.9. Auto-detect Spark version from installed pyspark instead of
hardcoding 3.3, falling back to the default if pyspark is not
installed. Also resolve correct Python binary in Spark bootstrap
script to avoid PATH conflicts with system python3.

---
X-AI-Prompt: fix job.py to select correct spark image for py312 and detect pyspark version
X-AI-Tool: kiro-cli
…on 3.12

Update expected error message in remote function tests to reflect that
SageMaker Spark images now support Python versions 3.9 and 3.12.
…or deps

Pin pyspark==3.5.1 in both feature-processor and test optional
dependencies to ensure consistent Spark version across environments.
…ersions

SageMaker Spark image only supports Python 3.9 and 3.12. Add skipif
markers to three feature processor integ tests that fail on Python 3.10.
Inject sagemaker-feature-store-pyspark>=2,<3 via pre_execution_commands
in _get_remote_decorator_config_from_input so it gets installed on the
remote container automatically.

Update integ tests: add skipif for Python 3.10 Spark tests, remove
manual feature-store-pyspark install, use python3 instead of python3.12.
… feature-store-pyspark

- Update test error messages to reflect Python 3.9 and 3.12 support
- Add pyspark 3.5.1 to test and feature-processor optional deps
- Skip Spark integ tests on unsupported Python versions (3.10)
- Auto-install sagemaker-feature-store-pyspark>=2,<3 via pre_execution_commands
  in to_pipeline and copy version-matched JAR to Spark classpath
- Use standard SageMaker Spark image resolution via SparkConfig
- Use python3 instead of python3.12 in integ test pre_execution_commands
… remote jobs

When spark_config is set on a remote job, _JobSettings now automatically
injects pip install of sagemaker-feature-store-pyspark and copies the
Spark 3.5-matched JAR to /usr/lib/spark/jars/ via pre_execution_commands.

This makes the package work transparently when the SageMaker Spark image
does not pre-install sagemaker-feature-store-pyspark.

- Make feature_store_pyspark imports lazy in _spark_factory.py to avoid
  deserialization failures when the module is not yet installed
- Add sagemaker-feature-store-pyspark to integ test requirements.txt
- Remove duplicate injection from feature_scheduler.py (to_pipeline path)
  since _JobSettings now handles all Spark remote jobs
…e_execution_commands

_JobSettings now auto-injects feature-store-pyspark install and JAR copy
commands when spark_config is set, so update the test assertion to
expect these commands in the _prepare_and_upload_workspace call.
…or Spark 3.5

- Replace feature_store_pyspark.classpath_jars() with glob pattern matching for *3.5*.jar files
- Add os and glob imports to the JAR copy command
- Construct jars_dir path dynamically from feature_store_pyspark module location
- Update test expectations to match the new JAR copy implementation
- Ensures compatibility with Spark 3.5 by explicitly targeting versioned JAR files
@BassemHalim BassemHalim changed the title Feat/feature store fp lf feat(feature-processor): Add Lake Formation credential vending and Spark 3.5/Python 3.12 support May 4, 2026
… gen, and JAR install

Cover untested changes from PR aws#5816:
- FeatureProcessorConfig.use_lake_formation_credentials field (default + enabled)
- @feature_processor decorator threading of use_lake_formation_credentials
- _config_uploader ECDSA key generation and signing_key passthrough
- _install_feature_store_jars skip and version-matched copy paths
- Update create_fp_config helper to accept use_lake_formation_credentials
---
X-AI-Prompt: add unit tests for untested changes in PR aws#5816
X-AI-Tool: kiro-cli
… Spark image

Verify _get_default_spark_image reads pyspark.__version__ and passes
the major.minor version (e.g. 3.5) to image_uris.retrieve.
---
X-AI-Prompt: add unit tests for untested changes in PR aws#5816
X-AI-Tool: kiro-cli
"python3 -c \"import feature_store_pyspark, shutil, os, glob; "
"jars_dir = os.path.join(os.path.dirname(feature_store_pyspark.__file__), 'jars'); "
"[shutil.copy(j, '/usr/lib/spark/jars/') "
"for j in glob.glob(os.path.join(jars_dir, '*3.5*.jar'))]\""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is injected into pre_execution_commands for any Spark job, but the glob *3.5*.jar is hardcoded to Spark 3.5. If a user has Spark 3.3 installed, this command will silently copy nothing (no matching JARs), and the Feature Store JAR won't be on the classpath. The dynamic version detection done in _spark_factory.py (spark_version = ".\".join(pyspark.__version__.split(".")[:2])) should be used here too — but since this runs at _JobSettings.__init__ time (client side), pyspark.__version__ is available. The glob should use the detected version, not a hardcoded 3.5.

- Use patch.dict(sys.modules) to mock pyspark instead of importing it
  directly, so the test runs without pyspark installed
- Remove debug print(cmds) from integ test helper
---
X-AI-Prompt: mock pyspark in unit test so it doesn't depend on pyspark being installed
X-AI-Tool: kiro-cli
…e resolver

- Move pyspark import from module level to function scope in _get_spark_image_uri
- Prevents unnecessary import at module initialization
- Reduces import overhead when image resolver is loaded but Spark functionality not used
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants